kafka知识点

您所在的位置:网站首页 kafka 消费者分区分配策略 kafka知识点

kafka知识点

#kafka知识点| 来源: 网络整理| 查看: 265

1. broker从此变成有状态的,会影响伸缩性;

2. 需要引入应答机制(acknowledgement)来确认消费成功。

3. 由于要保存很多consumer的offset信息,必然引入复杂的数据结构,造成资源浪费。

而Kafka选择了不同的方式:每个consumer group管理自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。

(2)、Kafka默认是定期帮你自动提交位移的(enable.auto.commit = true),你当然可以选择手动提交位移实现自己控制。

(3)、另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:

图中表明了test-group这个组当前的消费情况

老版本的位移是提交到zookeeper中的,目录结构是:/consumers//offsets//,但是zookeeper其实并不适合进行大批量的读写操作,尤其是写操作。

因此kafka提供了另一种解决方案:增加__consumeroffsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。依然以上图中的consumer group为例,格式大概如下:

__consumers_offsets topic配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的。

offset提交消息会根据消费组的key(消费组名称)进行分区. 对于一个给定的消费组,它的所有消息都会发送到唯一的broker(即Coordinator)

Coordinator上负责管理offset的组件是Offset manager。负责存储,抓取,和维护消费者的offsets. 每个broker都有一个offset manager实例. 有两种具体的实现:

ZookeeperOffsetManager: 调用zookeeper来存储和接收offset(老版本的位移管理)。

DefaultOffsetManager: 提供消费者offsets内置的offset管理。

通过在config/server.properties中的offset.storage参数选择。

DefaultOffsetManager

除了将offset作为logs保存到磁盘上,DefaultOffsetManager维护了一张能快速服务于offset抓取请求的consumer offsets表。这个表作为缓存,包含的含仅仅是”offsets topic”的partitions中属于leader partition对应的条目(存储的是offset)。

对于DefaultOffsetManager还有两个其他属性: “offsets.topic.replication.factor和”offsets.topic.num.partitions”,默认值都是1。这两个属性会用来自动地创建”offsets topic”。

offset manager接口的概要:

Offset Commit提交过程:

消费端

一条offset提交消息会作为生产请求.当消费者启动时,会为”offsets topic”创建一个消费者. 下面是内置的生产者的一些属性:

可以使用异步.但是使用同步可以避免延迟的生产请求(因为是批量消息),并且我们需要立即知道offset消息是否被broker成功接收

|request.required.acks|-1|确保所有的replicas和leader是同步的,并且能看到所有的offset消息

|key.serializer.class|StringEncoder|key和payload都是strings

注意我们没有对提交的offset消息进行压缩,因为每条消息本身大小是很小的,如果压缩了反而适得其反.

目前key和offset的值通过纯文本方式传递. 我们可以转换为更加紧凑的二进制协议,而不是把

Long类型的offset和Int类型的partition作为字符串. 当然在不断演进时还要考虑版本和格式协议.

broker端

broker把接收到的offset提交信息当做一个正常的生产请求,对offset请求的处理和正常的生产者请求处理方式是一样的.

一旦将数据追加到leader的本地日志中,并且所有的replicas都赶上leader.leader检查生产请求是”offsets topic”,

(因为broker端的处理逻辑针对offset请求和普通生产请求是一样的,如果是offset请求,还需要有不同的处理分支)

它就会要求offset manager添加这个offset(对于延迟的生产请求,更新操作会在延迟的生产请求被完成的时候).

因为设置了acks=-1,只有当这些offsets成功地复制到ISR中的所有brokers,才会被提交给offset manager.

3、Consumer Reblance

Rebalance Generation

JVM GC的分代收集就是这个词(严格来说是generational),我这里把它翻译成“届”好了,它表示了rebalance之后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中。我们有时候可以看到ILLEGAL_GENERATION的错误,就是kafka在抱怨这件事情。每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本,如下图所示:Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,之后成员4加入,再次触发rebalance,group进入Generation 3.

协议(protocol)

rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:

Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着

LeaveGroup请求:主动告诉coordinator我要离开consumer group

SyncGroup请求:group leader把分配方案告诉组内所有成员

JoinGroup请求:成员请求加入组

DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

Coordinator在rebalance的时候主要用到了前面4种请求。

liveness

consumer如何向coordinator证明自己还活着?通过定时向coordinator发送Heartbeat请求。如果超过了设定的超时时间,那么coordinator就认为这个consumer已经挂了。一旦coordinator认为某个consumer挂了,那么它就会开启新一轮rebalance,并且在当前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告诉其他consumer:不好意思各位,你们重新申请加入组吧!

Rebalance过程

rebalance的前提是coordinator已经确定了。

总体而言,rebalance分为2步:Join和Sync

1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

注意!!consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。比如这种机制下我可以实现类似于Hadoop那样的机架感知(rack-aware)分配方案,即为consumer挑选同一个机架下的分区数据,减少网络传输的开销。Kafka默认为你提供了两种分配策略:range和round-robin。由于这不是本文的重点,这里就不再详细展开了,你只需要记住你可以覆盖consumer的参数:partition.assignment.strategy来实现自己分配策略就好了。

和很多kafka组件一样,group也做了个状态机来表明组状态的流转。coordinator根据这个状态机会对consumer group做不同的处理,如下图所示

简单说明下图中的各个状态:

Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response:UNKNOWN_MEMBER_ID

Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求

PreparingRebalance:组准备开启新的rebalance,等待成员加入

AwaitingSync:正在等待leader consumer将分配方案传给各个成员

Stable:rebalance完成!可以开始消费了

rebalance场景剖析

1 新成员加入组(member join)

2 组成员崩溃(member failure)

组成员崩溃和组成员主动离开是两个不同的场景。因为在崩溃时成员并不会主动地告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期(心跳周期)才能检测到这种崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance。如图:

3 组成员主动离组(member leave group)

4 提交位移(member commit offset)

返回搜狐,查看更多



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3